package com.atguigu.flink.datastream.sink;

import com.atguigu.flink.func.ClickSource;
import com.atguigu.flink.pojo.Event;
import org.apache.commons.lang3.RandomUtils;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;

/**
 * @author WEIYUNHUI
 * @date 2023/6/14 11:25
 *
 * Kafka Connector:
 *  1. KafkaSource
 *  2. KafkaSink
 *       生产者:  KafkaProducer
 *       生产者配置:
 *          集群位置: bootstrap.servers
 *          Key的序列化器: key.serializer
 *          value的序列化器: value.serializer
 *          缓冲区大小:   buffer.memory
 *          每个分区的缓冲区大小（批次大小）: batch.size
 *          每批次超时时间:linger.ms
 *          应答级别: acks
 *          生产者事务超时时间: transaction.timeout.ms
 *          生产者事务id:  transactional.id
 *
 *       生产者分区分配策略：
 *             默认使用粘性分区分配策略：
 *                1. 如果明确指定了分区号，直接使用
 *                2. 如果没有指定分区号，但是指定了key， 按照key的hash值计算分区号
 *                3. 如果没有指定分区号和key， 直接使用粘性策略。
 *
 *      Kafka提供的生产者的配置类: ProducerConfig
 *
 */
public class Flink02_KafkaSink {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        env.enableCheckpointing(5000) ;

        DataStreamSource<Event> ds = env.addSource(new ClickSource());
        //将流中的数据写入到Kafka中
        KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
                .setBootstrapServers("hadoop102:9092, hadoop103:9092")
                .setRecordSerializer(
                        KafkaRecordSerializationSchema.<String>builder()
                                .setTopic("topicA")
                                .setValueSerializationSchema(new SimpleStringSchema())
                                .build()
                )
                // EOS(Exactly Once semantic): 精确一次,
                // AT_LEAST_ONCE : 至少一次
                .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
                // 如果将来使用精确一次，KafkaSink要求必须设置事务ID的前缀
                .setTransactionalIdPrefix("flink" + RandomUtils.nextInt(1, 100))
                //kafkaSink默认的生产者事务的超时时间为: 1 hour
                //KafkaBroker默认允许的事务的最大超时时间: 15 minutes
                //要求生产者事务的超时时间不能超过KafkaBroker允许的事务最大超时时间
                .setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG , "36000")
                //如果还有其他的配置， 可以直接使用setProperty
                //.setProperty(ProducerConfig.ACKS_CONFIG, "-1")
                .build();

        //新的API
        ds.map( Event::toString ).sinkTo(kafkaSink) ;

        try {
            env.execute();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}
